package main

import (
	"encoding/json"
	"github.com/Shopify/sarama"
	"github.com/go-kit/kit/log"
)

type consumerGroupHandler struct {
	logger        log.Logger
	CurrentOffset int64
	ACKMap        map[int64]*MessageInfo
	MessagesChan  chan *OffsetMessage
	ACKChan       chan *MessageInfo
}

func NewConsumerGroupHandler(logger log.Logger, ch chan *OffsetMessage, ackChan chan *MessageInfo) *consumerGroupHandler {
	return &consumerGroupHandler{
		logger:        logger,
		CurrentOffset: 0,
		ACKMap:        make(map[int64]*MessageInfo, 1000),
		MessagesChan:  ch,
		ACKChan:       ackChan,
	}
}

func (c *consumerGroupHandler) ConsumeMessage(msg *sarama.ConsumerMessage) *OffsetMessage {
	offsetMessage := &OffsetMessage{}
	_ = json.Unmarshal(msg.Value, offsetMessage)
	offsetMessage.Topic = &msg.Topic
	offsetMessage.Partition = &msg.Partition
	offsetMessage.Offset = &msg.Offset
	return offsetMessage
}

func (c *consumerGroupHandler) Loop(sess sarama.ConsumerGroupSession) {
	for {
		msgInfo, ok := c.ACKMap[c.CurrentOffset]
		if ok {
			sess.MarkOffset(*msgInfo.Topic, *msgInfo.Partition, *msgInfo.Offset, "")
			_ = c.logger.Log("ACK", "true", "Offset", *msgInfo.Offset)
			delete(c.ACKMap, c.CurrentOffset)
			c.CurrentOffset = c.CurrentOffset + 1
		} else {
			return
		}
	}
}

func (c *consumerGroupHandler) Setup(sess sarama.ConsumerGroupSession) error {
	return nil
}

func (c *consumerGroupHandler) Cleanup(sess sarama.ConsumerGroupSession) error {
	return nil
}

// consume message and ACK
func (c *consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	go func() {
		for {
			msgInfo := <-c.ACKChan
			if c.CurrentOffset == *msgInfo.Offset {
				sess.MarkOffset(*msgInfo.Topic, *msgInfo.Partition, *msgInfo.Offset, "")
				_ = c.logger.Log("ACK", "true", "Offset", *msgInfo.Offset)
				c.CurrentOffset = c.CurrentOffset + 1
				c.Loop(sess)
			} else {
				c.ACKMap[*msgInfo.Offset] = msgInfo
			}
		}
	}()
	for msg := range claim.Messages() {
		c.CurrentOffset = *c.ConsumeMessage(msg).Offset
		c.MessagesChan <- c.ConsumeMessage(msg)
		_ = c.logger.Log("topic", msg.Topic, "partition", msg.Partition, "offset", msg.Offset)
		break
	}
	for msg := range claim.Messages() {
		c.MessagesChan <- c.ConsumeMessage(msg)
		_ = c.logger.Log("topic", msg.Topic, "partition", msg.Partition, "offset", msg.Offset)
	}
	return nil
}
